Cmd 1
1
xxxxxxxxxx21
!pip install pyspark!pip install findsparkimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom pyspark.context import SparkContextfrom pyspark.sql.session import SparkSessionfrom pyspark.ml.feature import StringIndexerfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.sql.functions import isnull, when, count, colfrom pyspark.ml.evaluation import BinaryClassificationEvaluatorfrom pyspark.ml.classification import LogisticRegression# importsfrom subprocess import check_outputfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import (count, col)from pyspark.ml import Pipelinefrom pyspark.sql.functions import col,isnan, when, countCollecting pyspark
Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0
WARNING: You are using pip version 21.2.4; however, version 23.1.1 is available.
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-f457b28d-21e2-4dbf-98a7-956bff0b63bb/bin/python -m pip install --upgrade pip' command.
Collecting findspark
Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
WARNING: You are using pip version 21.2.4; however, version 23.1.1 is available.
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-f457b28d-21e2-4dbf-98a7-956bff0b63bb/bin/python -m pip install --upgrade pip' command.
Command took 9.21 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:13:25 PM on My Cluster
Cmd 2
Cmd 4
from functools import reduce
oldColumns = df.schema.names
newColumns = [
"age",
"workclass",
"finalweight",
"education",
"maritalstatus",
"occupation",
"relationship",
"race",
"sex",
"capitalgain",
"capitalloss",
"hoursperweek",
"nativecountry",
"income"
]
df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df)
df.printSchema()root
|-- age: string (nullable = true)
|-- workclass: string (nullable = true)
|-- finalweight: string (nullable = true)
|-- education: string (nullable = true)
|-- maritalstatus: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capitalgain: string (nullable = true)
|-- capitalloss: string (nullable = true)
|-- hoursperweek: string (nullable = true)
|-- nativecountry: string (nullable = true)
|-- income: string (nullable = true)
Command took 0.22 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:14:36 PM on My Cluster
Cmd 6
from pyspark.sql.functions import col
# Assuming your DataFrame is called "df"
df = df.withColumn("hoursperweek", col("hoursperweek").cast("integer"))
df = df.withColumn("age", col("age").cast("integer"))
df = df.withColumn("finalweight", col("finalweight").cast("integer"))
df = df.withColumn("capitalgain", col("capitalgain").cast("integer"))
df = df.withColumn("capitalloss", col("capitalloss").cast("integer"))Command took 0.17 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:14:52 PM on My Cluster
Cmd 8
df.printSchema()
root
|-- age: integer (nullable = true)
|-- workclass: string (nullable = true)
|-- finalweight: integer (nullable = true)
|-- education: string (nullable = true)
|-- maritalstatus: string (nullable = true)
|-- occupation: string (nullable = true)
|-- relationship: string (nullable = true)
|-- race: string (nullable = true)
|-- sex: string (nullable = true)
|-- capitalgain: integer (nullable = true)
|-- capitalloss: integer (nullable = true)
|-- hoursperweek: integer (nullable = true)
|-- nativecountry: string (nullable = true)
|-- income: string (nullable = true)
Command took 0.06 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:14:55 PM on My Cluster
Cmd 9
#Encoding the data for GENDER where 0 = Female, 1 = Male
from pyspark.sql.functions import when
df = df.withColumn("income", when(df["income"] == "<=50K", 0)
.otherwise(1))
df.select("income").distinct().show()(2) Spark Jobs
+------+
|income|
+------+
| 1|
| 0|
+------+
Command took 0.88 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:14:57 PM on My Cluster
Cmd 11
from pyspark.sql.functions import col
new_df = df.filter(col('workclass') == 'Private')
print(new_df.filter(col('income') == 1).count())
new_df.filter(col('income') == 1).show(2)(3) Spark Jobs
4876
+---+---------+-----------+---------+-------------+--------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
|age|workclass|finalweight|education|maritalstatus| occupation| relationship| race| sex|capitalgain|capitalloss|hoursperweek|nativecountry|income|
+---+---------+-----------+---------+-------------+--------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
| 45| Private| 172274|Doctorate| Divorced|Prof-specialty| Unmarried|Black|Female| 0| 3004| 35|United-States| 1|
| 52| Private| 129177|Bachelors| Widowed| Other-service|Not-in-family|White|Female| 0| 2824| 20|United-States| 1|
+---+---------+-----------+---------+-------------+--------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
only showing top 2 rows
Command took 1.29 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:05 PM on My Cluster
Cmd 12
Cmd 13
# Import required libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
# Select only numeric columns
numeric_cols = [c for c,t in df.dtypes if t in ['double', 'int']]
# Create vector assembler to combine numeric columns into a single vector column
assembler = VectorAssembler(inputCols=numeric_cols, outputCol='features')
assembled_df = assembler.transform(df).select('features')
# Compute correlation matrix
correlation_matrix = Correlation.corr(assembled_df, 'features').head()
# Convert correlation matrix to a Pandas DataFrame for plotting
corr_matrix_pd = correlation_matrix[0].toArray()
corr_matrix = pd.DataFrame(corr_matrix_pd, columns=numeric_cols, index=numeric_cols)
# Plot correlation matrix
fig, ax = plt.subplots(figsize=(10,8))
sns.heatmap(data=corr_matrix, annot=True, cmap='viridis')
ax.set_title('Adult Census Correlation Matrix', fontsize=15);
(4) Spark Jobs
Command took 3.81 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:10 PM on My Cluster
Cmd 14
# filter the DataFrame to only include those with income equal to 1
df_filtered = df.filter(df.income == 1)
# convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = df_filtered.select('age').toPandas()
# create a histogram of age using seaborn
sns.histplot(data=pandas_df, x='age', bins=20)
plt.title('Age Distribution for Income > 50K')
plt.xlabel('Age')
plt.ylabel('Frequency')
plt.show()
(1) Spark Jobs
Command took 0.95 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:17 PM on My Cluster
Cmd 15
# filter the DataFrame to only include those with income equal to 1
df_filtered = df.filter(df.income == 1)
# group the filtered DataFrame by work class and count the number of instances
grouped_df = df_filtered.groupBy('workclass').count()
# convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = grouped_df.toPandas()
# create a pie chart of work class using matplotlib
plt.figure(figsize=(6, 6))
plt.pie(pandas_df['count'], labels=pandas_df['workclass'], autopct='%1.1f%%', textprops={'fontsize': 8})
plt.title('Work Class Distribution for Income > 50K', fontsize=14)
plt.show()
(2) Spark Jobs
Command took 1.41 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:20 PM on My Cluster
Cmd 16
# Creating a pandas dataframe
pdf = df.select(col("age"), col("hoursperweek")).toPandas()
# Plotting a scatter plot using Seaborn
sns.scatterplot(x="age", y="hoursperweek", data=pdf)
plt.xlabel('Age')
plt.ylabel('Hours per Week')
plt.title('Age vs. Hours per Week')
plt.show()
(1) Spark Jobs
Command took 0.80 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:25 PM on My Cluster
Cmd 17
import plotly.express as px fig = px.scatter(df.toPandas(), x="age", y="hoursperweek", color="income", hover_data=['education', 'occupation']) fig.update_layout(title='Age vs. Hours per Week', xaxis_title='Age', yaxis_title='Hours per Week') fig.show()
(1) Spark Jobs
Command took 2.43 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:15:29 PM on My Cluster
Cmd 18
import plotly.express as px
fig = px.histogram(df.toPandas(), x="age", color="income", nbins=30,
marginal="box", hover_data=['education', 'occupation'])
fig.update_layout(title='Distribution of Age by Income', xaxis_title='Age')
fig.show()
(1) Spark Jobs
Command took 0.97 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:30:36 PM on My Cluster
Cmd 19
import plotly.express as px
# Define custom colors for the histogram bars
colors = {'<=50K': 'rgb(31, 100, 180)', '>50K': 'rgb(255, 117, 14)'}
fig = px.histogram(df.toPandas(), x="age", color="income", nbins=20,
marginal="box", hover_data=['education', 'occupation'],
color_discrete_map=colors)
fig.update_layout(title='Distribution of Age by Income', xaxis_title='Age')
fig.show()
(1) Spark Jobs
Command took 0.90 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:19:12 PM on My Cluster
Cmd 20
import plotly.express as px
fig = px.violin(df.toPandas(), y="age", x="income", box=True, points="all",
hover_data=['education', 'occupation'])
fig.update_layout(title='Distribution of Age by Income', yaxis_title='Age')
fig.show()
(1) Spark Jobs
Command took 1.07 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:21:56 PM on My Cluster
Cmd 21
# group the filtered DataFrame by occupation and count the number of instances
grouped_df = df_filtered.groupBy('occupation').count()
# sort the grouped DataFrame by count in descending order
sorted_df = grouped_df.sort('count', ascending=False)
# convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = sorted_df.toPandas()
# create a line graph of occupation using seaborn
sns.lineplot(data=pandas_df, x='occupation', y='count')
plt.title('Occupation Distribution for Income > 50K')
plt.xlabel('Occupation')
plt.ylabel('Count')
plt.xticks(rotation=90)
plt.show()Command skipped
Command took 13.47 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:13:25 PM on My Cluster
Cmd 22
# group the filtered DataFrame by education and count the number of instances
grouped_df = df_filtered.groupBy('education').count()
# sort the DataFrame by count in descending order
grouped_df = grouped_df.sort(F.desc('count'))
# convert the PySpark DataFrame to a Pandas DataFrame
pandas_df = grouped_df.toPandas()
# create a bar graph of education using seaborn
sns.barplot(data=pandas_df, x='education', y='count')
plt.title('Education Distribution for Income > 50K')
plt.xlabel('Education')
plt.ylabel('Count')
# rotate the x-axis labels by 90 degrees
plt.xticks(rotation=90)
plt.show()
Command skipped
Command took 13.48 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:13:25 PM on My Cluster
Cmd 23
import seaborn as sns
# filter the DataFrame to only include those with income equal to 1
df_filtered = df.filter(df.income == 1)
# create a boxplot using seaborn
sns.boxplot(x='sex', y='hoursperweek', data=df_filtered.toPandas())
# add labels and title
plt.title('Hours per Week by Race for Income > 50K')
plt.xlabel('Race')
plt.ylabel('Hours per Week')
plt.show()Command skipped
Command took 13.50 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:13:25 PM on My Cluster
Cmd 25
# filter the DataFrame for rows where income is 1
df_filtered = df.filter(F.col('income') == 1)
# group the data by marital status and calculate the average hours per week
df_grouped = df_filtered.groupBy('maritalstatus').agg(F.avg('hoursperweek').alias('avghours'))
# convert the PySpark DataFrame to a Pandas DataFrame for plotting
df_pandas = df_grouped.toPandas()
# create a bar chart using matplotlib
plt.bar(df_pandas['maritalstatus'], df_pandas['avghours'])
plt.xticks(rotation=90)
plt.xlabel('Marital Status')
plt.ylabel('Average Hours per Week')
plt.title('Average Hours per Week by Marital Status (Income >50K)')
plt.show()
Command skipped
Command took 13.49 seconds -- by smoturu@gmu.edu at 4/23/2023, 10:13:25 PM on My Cluster
Shift+Enter to run
Shift+Ctrl+Enter to run selected text
Shift+Ctrl+Enter to run selected text
